Big Data Finance Pipeline

Real-time cryptocurrency and FX data processing

Adam Kaniasty, Igor Kołodziej

Invalid Date

Agenda

  • Project Overview: What we’re building
  • Solution & Architecture: System design and technology stack
  • Data Sources: Binance WebSocket and NBP API
  • Data Ingestion: NiFi flows for crypto and FX data
  • Data Storage: HDFS three-layer architecture
  • Data Processing: ETL transformations
  • Analytics: Returns, correlations, and aggregations
  • Serving Layer: Hive SQL and HBase fast access
  • Results: End-to-end pipeline verification

Project Overview

Project Goals:

  • Build an end-to-end Big Data pipeline using Apache ecosystem
  • Handle real-time data streams (Binance WebSocket) and batch data (NBP API)
  • Implement three-layer data architecture (Raw → Curated → Aggregated)
  • Process and analyze financial data (cryptocurrency and FX rates)
  • Provide multiple query interfaces (SQL via Hive, fast reads via HBase)

Technical Requirements:

  • Ingestion: Real-time WebSocket streams and scheduled API polling
  • Storage: Distributed file system (HDFS) with partitioning strategy
  • Processing: ETL transformations using Apache Spark
  • Analytics: Compute returns, correlations, and aggregations
  • Serving: SQL interface (Hive) and fast random access (HBase)
  • Verification: Automated end-to-end testing pipeline

The Solution

Big Data Pipeline Architecture:

graph LR
    Binance["Binance<br/>WebSocket"] --> NiFi["Apache<br/>NiFi"]
    NBP["NBP<br/>API"] --> NiFi
    NiFi --> Kafka["Kafka<br/>(Optional)"]
    Kafka --> HDFS["HDFS<br/>Storage"]
    NiFi --> HDFS
    HDFS --> Spark["Apache<br/>Spark"]
    Spark --> Hive["Apache<br/>Hive"]
    Spark --> HBase["Apache<br/>HBase"]

Key Capabilities: - Unified ingestion for real-time and batch data - Distributed storage with partitioning - Scalable processing and analytics - Multiple query interfaces

Technology Stack

Core Components:

  • Apache NiFi 1.27.0 - Data orchestration
  • Apache Kafka 7.5.0 - Message buffering
  • Apache HDFS - Distributed file storage
  • Apache Spark 3.5.1 - Data processing
  • Apache Hive 2.3.2 - SQL interface
  • Apache HBase latest - Fast serving layer

Data Sources: Binance WebSocket

Binance WebSocket: - Endpoint: wss://stream.binance.com:9443/stream - Stream: btcusdt@aggTrade - Frequency: Real-time - Format: JSON

Sample Data:

{
  "e": "aggTrade",
  "E": 1672515782136,
  "s": "BTCUSDT",
  "a": 12345,
  "p": "16800.00",
  "q": "0.001",
  "f": 100,
  "l": 105,
  "T": 1672515782000,
  "m": false
}

Data Sources: NBP API

NBP API: - Endpoint: http://api.nbp.pl/api/exchangerates/tables/A/ - Frequency: Daily (business days) - Format: JSON array

Sample Data:

{
  "table": "A",
  "no": "001/A/NBP/2026",
  "effectiveDate": "2026-01-02",
  "rates": [
    {
      "currency": "dolar amerykański",
      "code": "USD",
      "mid": 3.6868
    },
    {
      "currency": "euro",
      "code": "EUR",
      "mid": 4.0123
    }
  ]
}

Data Ingestion: NiFi - Crypto Flow

Processor Details:

Step Processor Description
1 WebSocket Establishes persistent connection to Binance WebSocket API, subscribes to btcusdt@aggTrade stream for real-time trade events
2 ValidateRecord Validates JSON schema: checks required fields (e, E, s, a, p, q, T), ensures data types are correct
3 UpdateRecord Enriches data: extracts symbol from s, price from p, quantity from q, event_time from T, normalizes field names for downstream processing
4 UpdateAttribute Adds partitioning metadata: extracts date, hour, minute from timestamp, sets HDFS path attributes for efficient storage organization
5 PublishKafkaRecord Publishes validated records to Kafka topic crypto-trades for buffering and resilience
6 ConsumeKafkaRecord Consumes records from Kafka topic, ensures reliable delivery even if downstream processors are temporarily unavailable
7 MergeRecord Aggregates records by minute window, combines multiple trade events into single CSV records per minute
8 PutHDFS Writes merged CSV files to HDFS raw layer with date/hour partitioning

Flow File: crypto-flow-kafka-4.json (stored in nifi/backups/)

Crypto Flow

Data Ingestion: NiFi - NBP Flow

Processor Details:

Step Processor Description
1 InvokeHTTP Fetches daily exchange rates from NBP API endpoint http://api.nbp.pl/api/exchangerates/tables/A/, runs on schedule (daily, business days)
2 Extract Metadata Extracts table metadata: table number, no (publication number), effectiveDate from the API response
3 SplitJson Splits the rates array into individual JSON records, one per currency pair
4 Extract Currency Fields Extracts specific fields: currency (full name), code (ISO code like USD, EUR), mid (exchange rate value)
5 UpdateAttribute Sets HDFS attributes: extracts date from effectiveDate, constructs filename as nbp_rate_{CODE}_{DATE}.json
6 PutHDFS Writes individual currency records to HDFS raw layer with date partitioning

Flow File: NBP.json (stored in nifi/backups/)

NBP Flow

Data Storage: HDFS

Three-Layer Architecture:

  • Raw Layer CSV/JSON - Partitioned by date/hour
  • Curated Layer Parquet - Hourly aggregates (crypto), Daily rates (FX)
  • Aggregated Layer Parquet - Daily returns, correlations, Monthly facts

HDFS Directory Structure:

graph TD
    Root["/data/finance/"] --> Raw["raw/"]
    Root --> Curated["curated/"]
    Root --> Agg["aggregated/"]
    
    Raw --> CryptoRaw["crypto-trades/<br/>date=YYYY-MM-DD/<br/>hour=HH/"]
    Raw --> NbpRaw["nbp/<br/>date=YYYY-MM-DD/"]
    
    Curated --> CryptoCur["crypto/<br/>trades_agg_hourly/"]
    Curated --> NbpCur["nbp/<br/>fx_daily/"]
    
    Agg --> Daily["daily/"]
    Agg --> Monthly["monthly/"]

Data Processing: ETL

Crypto ETL (etl_crypto.py):

  1. Read CSV from raw layer
  2. Cast and validate data types
  3. Group by symbol, date, hour
  4. Compute aggregates: price_open, price_close, price_high, price_low, price_avg, price_p95, volume_base, volume_quote, trade_count
  5. Write Parquet to curated layer
  6. Partitioned by date and hour

NBP ETL (etl_nbp.py): 1. Read JSON from raw layer 2. Extract date from filename (YYYYMMDD format) 3. Cast mid (rate) to double 4. Handle duplicates (keep latest by load_ts) 5. Write Parquet to curated layer 6. Partitioned by year and month

Output: Standardized, validated, partitioned Parquet files

Analytics: Spark

FX Analytics (analytics_fx_crypto.py):

  1. Read from curated fx_daily table (Parquet format)
  2. Use window functions partitioned by currency code, ordered by date
  3. Compute lagged value: mid_lag = LAG(mid) OVER (PARTITION BY code ORDER BY fx_date)
  4. Calculate daily returns: ret_1d = (mid / mid_lag) - 1.0
  5. Write output to aggregated layer partitioned by year/month

Analytics: Spark

Crypto Analytics:

  1. Extract daily closes from hourly aggregates (select max price_close per symbol/date)
  2. Use window functions partitioned by symbol, ordered by date
  3. Calculate daily returns: ret_1d = (close / close_lag) - 1.0
  4. Write daily closes and returns to aggregated layer
  5. Compute monthly facts: average returns, volatility (stddev), trading days, last close
  6. Write monthly aggregations to aggregated layer

Analytics: Spark

Correlation Analysis:

  1. Join BTCUSDT daily returns with USD/PLN returns on date
  2. Compute 63-day rolling Pearson correlation using window functions
  3. Uses a 63-day rolling window; with insufficient history the correlation is null
  4. Write output to aggregated layer

Serving Layer: Hive

Apache Hive Overview:

  • SQL-on-Hadoop engine providing SQL interface over distributed data
  • Uses external tables pointing to Parquet files in HDFS
  • Automatically discovers partitions using MSCK REPAIR TABLE
  • Enables ad-hoc queries without writing Spark code

External Tables:

  • trades_agg_hourly - Hourly crypto aggregates (partitioned by date, hour)
  • fx_daily - Daily FX rates (partitioned by year, month)
  • fx_daily_returns - FX daily returns (partitioned by year, month)
  • crypto_daily - Crypto daily closes and returns (partitioned by symbol)
  • corr_btc_usdpln_63d - BTC/USDPLN 63-day rolling correlation
  • crypto_monthly_facts - Monthly crypto statistics (partitioned by symbol)

Serving Layer: Hive

Key Features:

  • SQL Interface: Standard SQL queries over Parquet data in HDFS
  • Partition Pruning: Automatically filters partitions based on WHERE clauses
  • Schema-on-Read: Tables defined by CREATE EXTERNAL TABLE statements
  • No Data Movement: Queries execute directly on HDFS data

Example Query:

SELECT fx_date, code, mid, ret_1d
FROM finance.fx_daily_returns
WHERE year = 2026 AND month = 1
ORDER BY fx_date DESC, code
LIMIT 10;

Serving Layer: HBase

Facts Table: finance:facts_daily

  • Row key design: Salt + Symbol + Date (e.g., 7|BTCUSDT|20260105)
  • Column families: metrics, price, volume, correlation
  • Stored metrics: metrics:ret_1d, price:avg, volume:sum, volume:count, correlation:btc_usdpln_63d
  • Loading process: load_hbase_facts.py reads from Spark analytics outputs and generates HBase shell put commands
  • Execution: hbase/load-facts.sh executes the generated commands to populate the table

Features:

  • Fast random reads: Millisecond latency for point queries by row key
  • Use case: Real-time lookups, API serving, point queries
  • Optimized for: Single-row retrievals rather than full table scans

Results

End-to-End Pipeline Verification (Test Date: 2026-01-12)

Test Script: test-full-pipeline.sh

Automated verification of complete pipeline: 1. Spark ETL → Transforms raw CSV/JSON to curated Parquet 2. Spark Analytics → Computes returns, correlations, monthly facts 3. Analytics Sanity Checks → Validates data quality and consistency 4. Hive Tables → Registers external tables for SQL queries 5. Hive Views → Creates analytics views for aggregated data 6. HBase Facts → Loads selected metrics for fast random access

Execution Results:

Data Ingestion:

  • CSV files written to HDFS raw layer
  • JSON files written to HDFS raw layer
  • NiFi processors: All executed successfully

ETL Output:

  • Crypto: CSV → Parquet hourly aggregates (curated layer)
  • NBP: JSON → Parquet daily rates (curated layer)

Results

Analytics Verified:

  • FX daily returns: 27 currency pairs computed
  • Crypto daily: 1 symbol (BTCUSDT) aggregated
  • Correlation: Requires 63 days minimum (insufficient data)

Hive Tables Registered:

  • trades_agg_hourly - Hourly crypto aggregates
  • fx_daily - Daily FX rates
  • fx_daily_returns - FX daily returns
  • crypto_daily - Crypto daily closes and returns

HBase Facts:

  • Table finance:facts_daily loaded and queryable

Status: All 6 test stages completed successfully

Thank You